/*
 * Copyright (C) 2021 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package com.github.florent37.camerafragment.sample.rxbus;

import com.github.florent37.camerafragment.sample.bean.BusData;

import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import ohos.app.AbilityContext;

/**
 * RxBus异步分发事件
 *
 * @since 2021-03-18
 */
public class RxBus {
    private static volatile RxBus defaultInstance;
    private static AbilityContext sAbility;
    private static final int SUBSCRIBER_CODE = -1;
    private Map<Class, List<Disposable>> subscriptionsByEventType = new HashMap<>();
    private Map<Class, List<SubscriberMethod>> subscriberMethodByEventType = new HashMap<>();
    private Map<Object, List<Class>> eventTypesBySubscriber = new HashMap<>();
    private Subject<Object> bus;

    private RxBus() {
        this.bus = PublishSubject.create().toSerialized();
    }

    /**
     * 获取RxBus分发器
     *
     * @param ability 页面
     * @return RxBus 分发器
     */
    public static RxBus get(AbilityContext ability) {
        sAbility = ability;
        RxBus rxBus = defaultInstance;
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                rxBus = defaultInstance;
                if (defaultInstance == null) {
                    rxBus = new RxBus();
                    defaultInstance = rxBus;
                }
            }
        }
        return rxBus;
    }

    /**
     * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
     *
     * @param eventType 事件类型
     * @return Flowable 可操作性的
     */
    private <T> Flowable<T> toObservable(Class<T> eventType) {
        return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(eventType);
    }

    /**
     * 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
     *
     * @param code 线程码
     * @param eventType 类型
     * @param <T> 泛型
     * @return Flowable 可操作的
     */
    private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
        return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
            .filter(new Predicate<Message>() {
                @Override
                public boolean test(Message o) throws Exception {
                    return o.getCode() == code && eventType.isInstance(o.getObject());
                }
            }).map(new Function<Message, Object>() {
                @Override
                public Object apply(Message o) throws Exception {
                    return o.getObject();
                }
            }).cast(eventType);
    }

    /**
     * 注册
     *
     * @param subscriber 订阅者
     */
    public void register(Object subscriber) {
        Class<?> subClass = subscriber.getClass();
        Method[] methods = subClass.getDeclaredMethods();
        for (Method method : methods) {
            if (method.isAnnotationPresent(Subscribe.class)) { // 获得参数类型
                Class[] parameterType = method.getParameterTypes();
                if (parameterType.length == 1) { // 参数不为空 且参数个数为1
                    Class eventType = parameterType[0];
                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();

                    SubscriberMethod subscriberMethod = new SubscriberMethod();
                    subscriberMethod.setSubscriber(subscriber);
                    subscriberMethod.setMethod(method);
                    subscriberMethod.setEventType(eventType);
                    subscriberMethod.setCode(code);
                    subscriberMethod.setThreadMode(threadMode);
                    addSubscriberToMap(eventType, subscriberMethod);
                    addSubscriber(subscriberMethod);
                } else if (parameterType == null || parameterType.length == 0) {
                    Class eventType = BusData.class;

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();

                    SubscriberMethod subscriberMethod = new SubscriberMethod();
                    subscriberMethod.setSubscriber(subscriber);
                    subscriberMethod.setMethod(method);
                    subscriberMethod.setEventType(eventType);
                    subscriberMethod.setCode(code);
                    subscriberMethod.setThreadMode(threadMode);
                    addSubscriberToMap(eventType, subscriberMethod);
                    addSubscriber(subscriberMethod);
                }
            }
        }
    }

    /**
     * 将event的类型以订阅中subscriber为key保存到map里
     *
     * @param subscriber 订阅者
     * @param eventType event类型
     */
    private void addEventTypeToMap(Object subscriber, Class eventType) {
        List<Class> eventTypes = eventTypesBySubscriber.get(subscriber);
        if (eventTypes == null) {
            eventTypes = new ArrayList<>();
            eventTypesBySubscriber.put(subscriber, eventTypes);
        }

        if (!eventTypes.contains(eventType)) {
            eventTypes.add(eventType);
        }
    }

    /**
     * 将注解方法信息以event类型为key保存到map中
     *
     * @param eventType event类型
     * @param subscriberMethod 注解方法信息
     */
    private void addSubscriberToMap(Class eventType, SubscriberMethod subscriberMethod) {
        List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
        if (subscriberMethods == null) {
            subscriberMethods = new ArrayList<>();
            subscriberMethodByEventType.put(eventType, subscriberMethods);
        }

        if (!subscriberMethods.contains(subscriberMethod)) {
            subscriberMethods.add(subscriberMethod);
        }
    }

    /**
     * 将订阅事件以event类型为key保存到map,用于取消订阅时用
     *
     * @param eventType event类型
     * @param disposable 订阅事件
     */
    private void addSubscriptionToMap(Class eventType, Disposable disposable) {
        List<Disposable> disposables = subscriptionsByEventType.get(eventType);
        if (disposables == null) {
            disposables = new ArrayList<>();
            subscriptionsByEventType.put(eventType, disposables);
        }

        if (!disposables.contains(disposable)) {
            disposables.add(disposable);
        }
    }

    /**
     * 用RxJava添加订阅者
     *
     * @param subscriberMethod 订阅方法
     */
    private void addSubscriber(final SubscriberMethod subscriberMethod) {
        Flowable flowable;
        if (subscriberMethod.getCode() == SUBSCRIBER_CODE) {
            flowable = toObservable(subscriberMethod.getEventType());
        } else {
            flowable = toObservable(subscriberMethod.getCode(), subscriberMethod.getEventType());
        }
        Disposable subscription = postToObservable(flowable, subscriberMethod)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object obj) throws Exception {
                    callEvent(subscriberMethod, obj);
                }
            });
        addSubscriptionToMap(subscriberMethod.getSubscriber().getClass(), subscription);
    }

    /**
     * 用于处理订阅事件在那个线程中执行
     *
     * @param observable 可操作性的
     * @param subscriberMethod 订阅方法
     * @return Flowable 可操作性的
     */
    private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
        Scheduler scheduler;
        switch (subscriberMethod.getThreadMode()) {
            case MAIN:
                scheduler = HarmonySchedulers.mainThread(sAbility);
                break;

            case NEW_THREAD:
                scheduler = Schedulers.newThread();
                break;

            case CURRENT_THREAD:
                scheduler = Schedulers.trampoline();
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.getThreadMode());
        }
        return observable.observeOn(scheduler);
    }

    /**
     * 回调到订阅者的方法中
     *
     * @param method 订阅方法
     * @param object 订阅对象
     */
    private void callEvent(SubscriberMethod method, Object object) {
        Class eventClass = object.getClass();
        List<SubscriberMethod> methods = subscriberMethodByEventType.get(eventClass);
        if (methods == null || methods.size() == 0) {
            return;
        }
        for (SubscriberMethod subscriberMethod : methods) {
            Subscribe sub = subscriberMethod.getMethod().getAnnotation(Subscribe.class);
            if (sub.code() == method.getCode() && method.getSubscriber().equals(subscriberMethod.getSubscriber())
                && method.getMethod().equals(subscriberMethod.getMethod())) {
                subscriberMethod.invoke(object);
            }
        }
    }

    /**
     * 取消注册
     *
     * @param subscriber 订阅对象
     */
    public void unRegister(Object subscriber) {
        List<Class> subscribedTypes = eventTypesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unSubscribeByEventType(subscriber.getClass());
                unSubscribeMethodByEventType(subscriber, eventType);
            }
            eventTypesBySubscriber.remove(subscriber);
        }
    }

    /**
     * subscriptions unsubscribe
     *
     * @param eventType 字节类
     */
    private void unSubscribeByEventType(Class eventType) {
        List<Disposable> disposables = subscriptionsByEventType.get(eventType);
        if (disposables != null) {
            Iterator<Disposable> iterator = disposables.iterator();
            while (iterator.hasNext()) {
                Disposable disposable = iterator.next();
                if (disposable != null && !disposable.isDisposed()) {
                    disposable.dispose();
                    iterator.remove();
                }
            }
        }
    }

    /**
     * 移除subscriber对应的subscriberMethods
     *
     * @param subscriber 订阅对象
     * @param eventType 父类字节
     */
    private void unSubscribeMethodByEventType(Object subscriber, Class eventType) {
        List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType);
        if (subscriberMethods != null) {
            Iterator<SubscriberMethod> iterator = subscriberMethods.iterator();
            while (iterator.hasNext()) {
                SubscriberMethod subscriberMethod = iterator.next();
                if (subscriberMethod.getSubscriber().equals(subscriber)) {
                    iterator.remove();
                }
            }
        }
    }

    /**
     * 发送事件
     *
     * @param code 发送码
     * @param obj 发送事件载体
     */
    public void send(int code, Object obj) {
        bus.onNext(new Message(code, obj));
    }

    /**
     * 发送事件
     *
     * @param obj 发送事件载体
     */
    public void send(Object obj) {
        bus.onNext(obj);
    }

    /**
     * 发送事件
     *
     * @param code 发送码
     */
    public void send(int code) {
        bus.onNext(new Message(code, new BusData()));
    }

    /**
     * 消息队列
     *
     * @since 2021-03-18
     */

    private class Message {
        private int code;
        private Object object;

        private Message() {
        }

        private Message(int code, Object obj) {
            this.code = code;
            this.object = obj;
        }

        private int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        private Object getObject() {
            return object;
        }

        public void setObject(Object object) {
            this.object = object;
        }
    }
}
